package priv.pfz.paxos.role;

import com.google.common.collect.Lists;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import priv.pfz.paxos.network.RpcManager;
import priv.pfz.paxos.network.dto.AcceptReq;
import priv.pfz.paxos.network.dto.AcceptResp;
import priv.pfz.paxos.network.dto.PrepareReq;
import priv.pfz.paxos.network.dto.PrepareResp;

import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;

/**
 * @author pengfangzhou
 * @date 2022/2/14 2:01
 */
@Slf4j
public class Proposer {
    private final String proposerId;
    @Getter
    private String proposalValue;
    private int proposalId = 0;
    private final List<PrepareResp> prepareResps = Lists.newArrayList();
    private final List<AcceptResp> acceptResps = Lists.newArrayList();
    private final AcceptorInfo acceptorInfo;
    private final RpcManager rpcManager;
    @Getter
    private boolean finish = false;

    public Proposer(String proposerId, String proposalValue, AcceptorInfo acceptorInfo, RpcManager rpcManager) {
        this.proposerId = proposerId;
        this.proposalValue = proposalValue;
        this.acceptorInfo = acceptorInfo;
        this.rpcManager = rpcManager;
    }

    /**
     * 进入prepare阶段
     */
    public synchronized void doPreparePhase() {
        proposalId++;
        log.info("[{}] begin new prepare phase {}", proposerId, proposalId);
        prepareResps.clear();
        for (Acceptor acceptor : acceptorInfo.getAcceptorMap().values()) {
            PrepareReq req = new PrepareReq();
            req.setProposalId(proposalId);
            rpcManager.prepare(this, acceptor, req, acceptor::handlePrepare, this::handlePrepareResp);
        }
    }

    private synchronized void handlePrepareResp(PrepareReq req, PrepareResp resp) {
        List<PrepareResp> validPrepareResps = addToResps(prepareResps, resp);
        if (validPrepareResps == null) {
            return;
        }

        if (validPrepareResps.size() < acceptorInfo.getMajority()) {
            //prepare失败，重新开始
            doPreparePhase();
        } else {
            //若返回值中有已accept的值，则从其中取proposalId最大的，覆盖自己提议的值
            Integer maxAcceptedProposal = null;
            for (PrepareResp prepareResp : validPrepareResps) {
                if (prepareResp.getAcceptedValue() == null) {
                    continue;
                }
                if (maxAcceptedProposal == null || prepareResp.getAcceptedProposalId() > maxAcceptedProposal) {
                    maxAcceptedProposal = prepareResp.getAcceptedProposalId();
                    proposalValue = prepareResp.getAcceptedValue();
                }
            }
            if (maxAcceptedProposal != null) {
                log.info("[{}] proposalValue update to {}", proposerId, proposalValue);
            }
            doAcceptPhase();
        }
    }

    /**
     * 进入accept阶段
     */
    private void doAcceptPhase() {
        log.info("[{}] begin accept phase {}", proposerId, proposalId);
        acceptResps.clear();
        for (Acceptor acceptor : acceptorInfo.getAcceptorMap().values()) {
            AcceptReq req = new AcceptReq();
            req.setProposalId(proposalId);
            req.setValue(proposalValue);
            rpcManager.accept(this, acceptor, req, acceptor::handleAccept, this::handleAcceptResp);
        }
    }

    private synchronized void handleAcceptResp(AcceptReq req, AcceptResp resp) {
        List<AcceptResp> validAcceptResps = addToResps(acceptResps, resp);
        if (validAcceptResps == null) {
            return;
        }

        if (validAcceptResps.size() < acceptorInfo.getMajority()) {
            //重新开始prepare阶段
            doPreparePhase();
        } else {
            //判断值被多数派确认
            boolean valueChosen = true;
            for (AcceptResp acceptResp : validAcceptResps) {
                if (acceptResp.getMinProposal() > proposalId) {
                    valueChosen = false;
                    break;
                }
            }
            if (valueChosen) {
                //进入learn阶段，略
                log.info("[{}] value {} is chosen.", proposerId, proposalValue);
                finish = true;
            } else {
                //重新开始prepare阶段
                doPreparePhase();
            }
        }
    }

    /**
     * 实际通信中有丢包或超时的情况，这里简化实现为proposer收齐所有响应后开始执行后续逻辑，resp为null的表示丢包或超时
     * proposer只处理最先收到的majority个有效响应
     */
    private <T> List<T> addToResps(List<T> resps, T resp) {
        resps.add(resp);
        if (resps.size() < acceptorInfo.getTotal()) {
            return null;
        }
        return resps.stream()
                .filter(Objects::nonNull)
                .limit(acceptorInfo.getMajority())
                .collect(Collectors.toList());
    }
}
